NSQ 是一個高效、可靠的即時分散式訊息處理系統,以去中心化的設計提供了高可用性和水平擴展能力。它適合用於需要低延遲和高吞吐量的訊息傳遞,如:微服務架構。
NSQ 採用 Publish/Subscribe 機制,Consumer 會訂閱某個特定的 主題(Topic),當 Producer 發送該 Topic 的 訊息(Message) 時,NSQ 會將訊息傳送給 Consumer。
NSQ 由以下三個元件組成:
Topic 是用來分類 Message 的, Producer 可以根據 Message 的類型發送至不同的 Topic,Consumer 也可以根據所需來訂閱某些 Topic 的 Message。每一個 Topic 就是一個 流(Stream),它會將 Producer 發送的 Message 進行緩衝,防止大量 Message 阻塞的發生。
從上面的敘述可以得知,Topic 是 Consumer 與 Producer 溝通的介面,格式為字串,限制使用字元 a
到 z
、A
到 Z
、0
到 9
、-
及 _
,長度的部份至少要有 1
個字元,但不得超過 64
字元。下方是幾個 NSQ Topic 的範例:
order_created
order-update
在每個 Topic 下,會有一個或多個 通道(Channel), 而一個 Channel 會有一個或多個 Consumer,每當 Producer 發送了一個 Message 時,在該 Topic 下的 所有 Channel 都會獲得該 Message 的副本,Channel 會將該 Message 分配給 其中一個 Consumer,達到負載平衡的效果。
注意:如果一個 Topic 底下沒有 Channel,那麼 Message 會 Queue 在 Topic 內,直到 Channel 建立。
在終端機輸入下方指令以便從 Docker Hub 下載 NSQ 的 Docker Image:
$ docker pull nsqio/nsq
我們會用 Docker 來架設三個元件,為了讓它們之間可以進行通訊,故需要建立一個 Docker Bridge Network 來建立通訊的橋樑:
$ docker network create <NETWORK_NAME>
建立好之後,透過下方指令先將 nsqlookupd 架設起來:
$ docker run --name <NSQ_LOOKUPD_NAME> --network <NETWORK_NAME> -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
架設好 nsqlookupd 之後,透過下方指令將 nsqd 架設起來,並連接到 nsqlookupd:
$ docker run --name <NSQD_NAME> --network <NETWORK_NAME> -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=<NSQD_NAME> --lookupd-tcp-address=<NSQ_LOOKUPD_NAME>:4160
最後,透過下方指令將 nsqadmin 架設起來,並連接到 nsqlookupd:
$ docker run --name <NSQ_ADMIN_NAME> --network <NETWORK_NAME> -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=<NSQ_LOOKUPD_NAME>:4161
透過 nsqd 提供的 HTTP API 即可對它進行操作,透過 Postman 進行測試,以 POST
方法存取 http://localhost:4151/pub?topic=test,並傳送 Hello NSQ!
作為請求主體,會看到下方回應顯示 OK
:
透過瀏覽器打開 http://localhost:4171 進入 nsqadmin 的 Web 介面並點選上方「Streams」,會看到「Topic Message Queue」有一個「NSQd Host」且它的「Messages」為 1
:
接下來,我們要運用前一篇的內容來實作 NSQ Transporter,讓我們可以用跟其他 Transporter 一樣的開發風格來使用 NSQ。
首先,安裝 NSQ 官方推出的 Node.js Client - nsqjs,我們會基於它來進行實作:
$ npm install nsqjs
另外,還需要安裝它的型別定義檔:
$ npm install @types/nsqjs -D
注意:本篇文章僅會使用部分功能,如果有興趣的話可以參考官方文件。
注意:本次僅會實作 Event-based 的情境,且實作內容不建議使用於生產環境,僅作為示範使用。
建立一個 NsqServer
繼承 Server
並實作 CustomTransportStrategy
:
import { Server, CustomTransportStrategy } from '@nestjs/microservices';
export class NsqServer extends Server implements CustomTransportStrategy {
listen(callback: (...args: unknown[]) => any) {
callback();
}
close() {}
}
接著,定義 NsqServer
的 options
參數,名為 NsqOption
,它提供建立 Consumer 可以帶入的 ReaderConnectionConfigOptions
選項以及收到訊息後用來反序列化的 deserializer
:
import { ConsumerDeserializer } from '@nestjs/microservices';
import { ReaderConnectionConfigOptions } from 'nsqjs';
export interface NsqOption {
deserializer?: ConsumerDeserializer;
consumer?: ReaderConnectionConfigOptions;
}
我們要讓 NsqServer
在啟動時抓取所有套用 @EventPattern
的 Handler,並根據帶入的 Pattern 來建立對應的 Consumer 實例,但因為建立 Consumer 除了需要 Topic 外,還需要指定 Channel,所以需要定義 @EventPattern
的參數格式:
export interface NsqPattern {
topic: string;
channel: string;
}
在使用上就會像下方 AppController
一樣,帶入符合 NsqPattern
的物件:
import { Controller } from '@nestjs/common';
import { EventPattern } from '@nestjs/microservices';
@Controller()
export class AppController {
@EventPattern({
topic: 'order_created',
channel: 'test',
})
orderCreated(data: { msg: string; }) {
console.log('orderCreated', data);
}
}
有了取得 Topic 與 Channel 的方式之後,還需要定義 Context 讓處理訊息的 handleEvent
可以接收,這裡我們定義 NsqContext
並提供 getTopic
與 getChannel
方法:
import { BaseRpcContext } from '@nestjs/microservices';
export class NsqContext extends BaseRpcContext {
getTopic(): string {
return this.getArgByIndex(0);
}
getChannel(): string {
return this.getArgByIndex(1);
}
}
由於 handleEvent
只接收 ReadPacket
格式的訊息,如果使用 Consumer 本身回應的 Message 就不會吻合,所以需要透過 deserializer
來進行反序列化,但原生預設的 deserializer
為 IncomingRequestDeserializer
,不太符合我們目前的使用情境,所以需要自行設計一個 deserializer
作為預設值。新增一個 NsqInboundMessageDeserializer
來將 Consumer 回應的 Message
轉換成 IncomingEvent
以符合 ReadPacket
格式:
import { ConsumerDeserializer, IncomingEvent } from '@nestjs/microservices';
import { Message } from 'nsqjs';
import { NsqOption } from './nsq';
export class NsqInboundMessageDeserializer implements ConsumerDeserializer {
deserialize(message: Message, options?: NsqOption): IncomingEvent {
return {
data: message.json(),
pattern: options,
};
}
}
最後,就可以跟 nsqjs 做整合,調整 NsqServer
的內容,定義 generateConsumer
方法來產生 Consumer 實例,再定義 registerConsumer
來讓 Consumer 持續收 Message,一旦收到 Message 就會透過 deserializer
反序列化成內部使用的訊息格式,再透過 handleEvent
將訊息交給 Server
來處理,如此一來,就可以讓符合 Pattern 的 Handler 收到該訊息,最後就是定義 bindEvents
來抓取所有套用 @EventPattern
的 Handler 並產生 Consumer:
import { Server, CustomTransportStrategy, IncomingEvent } from '@nestjs/microservices';
import { Reader } from 'nsqjs';
import { NsqOption, NsqPattern, NsqContext } from './nsq';
import { NsqInboundMessageDeserializer } from './nsq-deserializer';
export class NsqServer extends Server implements CustomTransportStrategy {
private readonly consumers: Reader[] = [];
constructor(private readonly options?: NsqOption) {
super();
// 使用 `NsqInboundMessageDeserializer` 當作預設值
this.initializeDeserializer({
...this.options,
deserializer: this.options?.deserializer ?? new NsqInboundMessageDeserializer(),
});
}
listen(callback: (...args: unknown[]) => any) {
this.bindEvents();
callback();
}
close() {
// 關閉所有 Consumer
this.consumers.forEach((consumer) => {
consumer.close();
});
}
private bindEvents() {
this.messageHandlers.forEach((handler, pattern) => {
if (!handler.isEventHandler) {
return;
}
const nsqPattern: NsqPattern = JSON.parse(pattern);
const consumer = this.generateConsumer(nsqPattern);
this.registerConsumer(consumer, nsqPattern);
this.consumers.push(consumer);
});
}
private generateConsumer(params: NsqPattern) {
const { topic, channel } = params;
return new Reader(topic, channel, this.options?.consumer);
}
private registerConsumer(consumer: Reader, pattern: NsqPattern) {
consumer.connect();
consumer.on('message', async (message) => {
const result = this.deserializer.deserialize(message, pattern) as IncomingEvent;
const context = new NsqContext([pattern.topic, pattern.channel]);
await this.handleEvent(JSON.stringify(pattern), result, context);
message.finish();
});
}
}
到目前為止已經完成 Strategy 的部份,可以實際套用來測試看看,在 main.ts
使用該 Strategy,並設定 consumer
的 nsqdTCPAddresses
為 localhost:4150
:
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { NsqServer } from './transporters';
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new NsqServer({
consumer: {
nsqdTCPAddresses: ['localhost:4150'],
},
}),
});
await app.listen();
}
bootstrap();
透過 Postman 進行測試,使用 POST
方法存取 http://localhost:4151/pub?topic=order_created,並指定主體資料為 { "msg": "created" }
:
此時會在微服務應用程式的終端機看到收到的訊息內容:
建立 NsqClient
並繼承 ClientProxy
,根據上一篇內容所述,會需要實作 connect
、close
、publish
以及 dispatchEvent
:
import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices';
import { Writer } from 'nsqjs';
export class NsqClient extends ClientProxy {
connect(): Promise<Writer> {}
close() {}
protected publish(packet: ReadPacket<any>, callback: (packet: WritePacket<any>) => void): () => void {
throw new Error('no implementation');
}
protected dispatchEvent<T = any>(packet: ReadPacket<any>): Promise<T> {}
}
注意:由於我們的範例並沒有實作 Request-response 模式,故
publish
是拋出Error
。
如果希望 Producer 可以自行設定連線資訊,以及希望可以設置 serializer
,那麼可以設置 NsqOption
當作 NsqClient
的 options
參數內容:
import { ProducerSerializer } from '@nestjs/microservices';
import { ConnectionConfigOptions } from 'nsqjs';
export interface NsqOption {
serializer?: ProducerSerializer;
producer?: {
connection?: ConnectionConfigOptions;
nsqdHost?: string;
nsqdPort?: number;
};
}
接著,調整 NsqClient
的內容,在建構階段呼叫 initializeSerializer
來初始化 serializer
。由於 ClientProxy
會在每次呼叫 emit
與 send
時都去執行 connect
來進行連線,所以我們需要在 connect
方法做一些處理,確保 Producer 可以被重用,而不是每次都重新建立連線。當 Producer 實例不存在時,會透過 generateProducer
來產生 Producer 實例並保存起來,再透過 connectToNsq
方法來建立連線;反之,如果存在 Producer 實例,就會直接回傳該實例。當開發者呼叫 NsqClient
的 emit
方法時,會經由 dispatchEvent
來發送訊息,所以我們會在這個方法內透過 normalizePattern
來轉換 Pattern,並透過 serializer
序列化訊息,最後再透過 Producer 的 publish
將訊息發送出去:
import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices';
import { Writer } from 'nsqjs';
import { NsqOption } from './nsq';
export class NsqClient extends ClientProxy {
private producer: Writer | null = null;
constructor(private readonly options?: NsqOption) {
super();
this.initializeSerializer({
...this.options,
});
}
async connect(): Promise<Writer> {
if (this.producer) {
return this.producer;
}
this.producer = this.generateProducer();
return this.connectToNsq(this.producer);
}
close() {
// 關閉 Producer
this.producer?.close();
}
protected publish(packet: ReadPacket<any>, callback: (packet: WritePacket<any>) => void): () => void {
throw new Error('no implementation');
}
protected dispatchEvent<T = any>(packet: ReadPacket<any>): Promise<T> {
const pattern = this.normalizePattern(packet.pattern);
const serializedMessage = this.serializer.serialize(packet);
this.producer?.publish(pattern, serializedMessage.data);
return Promise.resolve(serializedMessage.data);
}
private generateProducer() {
const host = this.options?.producer?.nsqdHost ?? 'localhost';
const port = this.options?.producer?.nsqdPort ?? 4150;
return new Writer(host, port, this.options?.producer?.connection);
}
private connectToNsq(producer: Writer) {
return new Promise<Writer>((resolve, reject) => {
producer.connect();
producer.once('ready', () => {
resolve(producer);
});
producer.once('error', (err) => {
reject(err);
});
});
}
}
已經完成了 NsqClient
,現在可以實際套用來測試看看,在 AppModule
透過 ClientsModule
的 register
來註冊 NsqClient
,這邊我們使用 customClass
來指定要以 NsqClient
來建立實例:
import { Module } from '@nestjs/common';
import { ClientsModule } from '@nestjs/microservices';
import { AppController } from './app.controller';
import { NsqClient } from './clients';
@Module({
imports: [
ClientsModule.register([
{
name: 'NSQ_CLIENT',
customClass: NsqClient,
},
]),
],
controllers: [AppController],
})
export class AppModule {}
接著,調整 AppController
的部份,使用 @Inject
裝飾器注入 NSQ_CLIENT
,並建立 createOrder
方法來呼叫 NsqClient
的 emit
方法,Pattern 的部份指定為 order_created
,訊息的部份則帶入 { "msg": "Order Created!" }
:
import { Controller, Post, Inject } from '@nestjs/common';
import { NsqClient } from './clients';
@Controller()
export class AppController {
constructor(@Inject('NSQ_CLIENT') private readonly nsqClient: NsqClient) {}
@Post('orders')
createOrder() {
const data = { msg: 'Order created!' };
this.nsqClient.emit('order_created', data);
return data;
}
}
透過 Postman 進行測試,以 POST
方法存取 http://localhost:3000/orders,此時會在微服務應用程式的終端機看到由 Client 端傳送的訊息:
回顧一下今天的重點內容,我們首先簡單介紹了 NSQ 這套高效的訊息處理系統。在了解了相關知識後,我們使用 Custom Transporter 的技巧,設計了一套簡單的 NSQ Transporter。在這個過程中,我們利用 Server
提供的方法來幫助我們更容易地構建策略,也善用了 ClientProxy
的方法來包裝 NsqClient
。
NestJS 的 Transporter 教學到此告一段落,這幾天我們學習了 NestJS 各式內建的 Transporter,如 Redis、NATS、gRPC 等,甚至還實作了屬於我們自己的 Transporter。這些內容展現了 NestJS 在提升開發體驗方面的努力。下一篇文章,我們將進入 微服務的健康與可觀測性篇,敬請期待!